package main

import (
	"context"
	"encoding/base64"
	"encoding/binary"
	"encoding/json"
	"flag"
	"fmt"
	"io"
	"log"
	"math"
	"os"
	"os/signal"
	"strings"
	"sync"
	"sync/atomic"
	"syscall"
	"time"

	lkpacer "github.com/livekit/mediatransportutil/pkg/pacer"
	lkauth "github.com/livekit/protocol/auth"
	livekitpb "github.com/livekit/protocol/livekit"
	lksdk "github.com/livekit/server-sdk-go/v2"
	webrtc "github.com/pion/webrtc/v4"

	"github.com/hajimehoshi/oto/v2"
	"github.com/joho/godotenv"
)

// Simple PCM16 -> bytes helper
func int16ToBytes(samples []int16) []byte {
	out := make([]byte, len(samples)*2)
	for i := range samples {
		binary.LittleEndian.PutUint16(out[i*2:i*2+2], uint16(samples[i]))
	}
	return out
}

func main() {
	// Load .env from current folder if present
	_ = godotenv.Load()

	// Flags/env: LIVEKIT_URL, TOKEN, ROOM (optional), TARGET_IDENTITY filter
	var url string
	var token string
	var target string
	var room string
	var identity string
	var device string
	var inspect bool
	var watch bool
	var expectedSR int
	var beep bool
	var beepDur float64
	var pipeBeep bool
	var framesOnly bool
	var maxLatencyMs int
	var targetLatencyMs int
	var enableDrop bool
	var autoRejoin bool
	var stallMs int
	flag.StringVar(&url, "url", os.Getenv("LIVEKIT_URL"), "LiveKit URL (wss://...) or env LIVEKIT_URL")
	flag.StringVar(&token, "token", os.Getenv("LIVEKIT_TOKEN"), "Access token or env LIVEKIT_TOKEN")
	flag.StringVar(&target, "target", os.Getenv("TARGET_IDENTITY"), "Subscribe only to this participant identity (optional)")
	flag.StringVar(&room, "room", os.Getenv("LIVEKIT_ROOM_NAME"), "Room name to join (defaults from LIVEKIT_ROOM_NAME)")
	flag.StringVar(&identity, "identity", os.Getenv("LIVEKIT_IDENTITY"), "Client identity (optional; autogenerated if empty)")
	flag.StringVar(&device, "device", "default", "Audio output device hint (ignored on macOS)")
	flag.BoolVar(&inspect, "inspect", false, "List rooms/participants via RoomService before connecting")
	flag.BoolVar(&watch, "watch", false, "Poll participants every 5s via RoomService and log")
	flag.IntVar(&expectedSR, "expected-sr", 16000, "Expected audio sample rate for throughput metrics (Hz)")
	flag.BoolVar(&beep, "beep", false, "Play a 1s 440Hz test tone on start to verify audio output")
	flag.Float64Var(&beepDur, "beep-dur", 1.0, "Test tone duration in seconds (with --beep)")
	flag.BoolVar(&pipeBeep, "pipe-beep", false, "Inject a 1s 440Hz test tone into the main player pipe")
	flag.BoolVar(&framesOnly, "frames-only", false, "Only log data packet frames; suppress periodic/status logs")
	flag.IntVar(&maxLatencyMs, "max-latency-ms", 250, "Maximum buffered latency before dropping (ms)")
	flag.IntVar(&targetLatencyMs, "target-latency-ms", 120, "Target latency after drops (ms)")
	flag.BoolVar(&enableDrop, "drop-old", true, "Enable dropping of oldest frames when backlog exceeds max-latency-ms")
	flag.BoolVar(&autoRejoin, "auto-rejoin-on-stall", true, "Automatically rejoin the room if no data arrives for stall-ms")
	flag.IntVar(&stallMs, "stall-ms", 2000, "No-data stall duration before triggering rejoin (ms)")
	flag.Parse()

	if url == "" {
		log.Fatalf("url is required (flag --url or env LIVEKIT_URL)")
	}

	// If no token provided, mint one using API key/secret and room
	if token == "" {
		apiKey := os.Getenv("LIVEKIT_API_KEY")
		apiSecret := os.Getenv("LIVEKIT_API_SECRET")
		if apiKey == "" || apiSecret == "" {
			log.Fatalf("LIVEKIT_TOKEN not provided and LIVEKIT_API_KEY/SECRET missing; cannot mint token")
		}
		if room == "" {
			log.Fatalf("room is required to mint token (flag --room or env LIVEKIT_ROOM_NAME)")
		}
		if identity == "" {
			identity = fmt.Sprintf("speaker-%d", time.Now().UnixNano())
		}
		at := lkauth.NewAccessToken(apiKey, apiSecret)
		at.SetIdentity(identity)
		at.SetName(identity)
		at.AddGrant(&lkauth.VideoGrant{RoomJoin: true, Room: room})
		jwt, err := at.ToJWT()
		if err != nil {
			log.Fatalf("mint token: %v", err)
		}
		token = jwt
		log.Printf("joining as (minted): identity=%s room=%s", identity, room)
	} else {
		sub, name := decodeJWTIdentity(token)
		if sub != "" || name != "" {
			log.Printf("joining with provided token: sub=%s name=%s room=%s", sub, name, room)
		} else {
			log.Printf("joining with provided token (identity unknown pre-connect) room=%s", room)
		}
	}

	log.Printf("livekit-speaker: url=%s room=%s target=%q tokenLen=%d", url, room, target, len(token))

	// Optional: inspect via RoomService
	if inspect || watch {
		apiKey := os.Getenv("LIVEKIT_API_KEY")
		apiSecret := os.Getenv("LIVEKIT_API_SECRET")
		if apiKey == "" || apiSecret == "" {
			log.Printf("inspect/watch requested but LIVEKIT_API_KEY/SECRET missing; skipping REST")
		} else {
			apiURL := toHTTPBase(url)
			rs := lksdk.NewRoomServiceClient(apiURL, apiKey, apiSecret)
			// One-shot inspect
			if inspect {
				ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
				defer cancel()
				rooms, err := rs.ListRooms(ctx, &livekitpb.ListRoomsRequest{})
				if err != nil {
					log.Printf("ListRooms error: %v", err)
				} else {
					log.Printf("rooms: %d", len(rooms.Rooms))
					for _, r := range rooms.Rooms {
						log.Printf("- room: %s participants=%d", r.Name, r.NumParticipants)
					}
				}
				if room != "" {
					parts, err := rs.ListParticipants(ctx, &livekitpb.ListParticipantsRequest{Room: room})
					if err != nil {
						log.Printf("ListParticipants error: %v", err)
					} else {
						log.Printf("participants in %s: %d", room, len(parts.Participants))
						for _, p := range parts.Participants {
							log.Printf("- %s tracks: audio=%t video=%t", p.Identity, hasAudio(p), hasVideo(p))
						}
					}
				}
			}
			// Watch loop
			if watch && room != "" {
				go func() {
					ticker := time.NewTicker(5 * time.Second)
					defer ticker.Stop()
					for range ticker.C {
						ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
						parts, err := rs.ListParticipants(ctx, &livekitpb.ListParticipantsRequest{Room: room})
						cancel()
						if err != nil {
							log.Printf("watch: ListParticipants error: %v", err)
							continue
						}
						ids := make([]string, 0, len(parts.Participants))
						for _, p := range parts.Participants {
							ids = append(ids, p.Identity)
						}
						log.Printf("watch: participants=%v", ids)
					}
				}()
			}
		}
	}

	// Prepare audio output (16kHz mono, 16-bit)
	const outSampleRate = 16000
	const outChannels = 1
	ctx, ready, err := oto.NewContext(outSampleRate, outChannels, 2)
	if err != nil {
		log.Fatalf("oto.NewContext: %v", err)
	}
	<-ready

	// Optional: play a quick test tone to verify output path
	if beep {
		go func() {
			if err := playSine(ctx, 440.0, beepDur, outSampleRate, outChannels, 0.3); err != nil {
				log.Printf("beep error: %v", err)
			} else {
				log.Printf("beep played (%0.1fs at 440Hz)", beepDur)
			}
		}()
	}
	pr, pw := io.Pipe()
	player := ctx.NewPlayer(pr)
	player.Play()
	defer player.Close()
	if pipeBeep {
		go func() {
			// short delay to ensure player is running
			time.Sleep(200 * time.Millisecond)
			if err := writeSineTo(pw, 440.0, beepDur, outSampleRate, outChannels, 0.7); err != nil {
				log.Printf("pipe-beep error: %v", err)
			} else {
				log.Printf("pipe-beep played (%0.1fs at 440Hz)", beepDur)
			}
		}()
	}

	// Prepare subscribe handler BEFORE connecting so new tracks trigger immediately
	cancelFns := make([]context.CancelFunc, 0)
	pcw := &pcmWriter{w: pw, levelEvery: 50}
	pcw.metrics.expectedBps = expectedSR * outChannels * 2
	if !framesOnly {
		// periodic status
		go func() {
			t := time.NewTicker(2 * time.Second)
			defer t.Stop()
			for range t.C {
				pcw.mu.Lock()
				last := pcw.lastWrite
				bytes := pcw.totalBytes
				frames := pcw.frames
				pcw.mu.Unlock()
				ago := "never"
				if !last.IsZero() {
					ago = time.Since(last).Truncate(100 * time.Millisecond).String()
				}
				// also print throughput snapshot
				mbps, pct := pcw.metrics.snapshot()
				log.Printf("audio status: frames=%d totalBytes=%d lastWrite=%s ago, rate=%0.1f kB/s (%d%%)", frames, bytes, ago, mbps/1024.0, pct)
			}
		}()
		// 1s-rate printer
		go func() {
			t := time.NewTicker(1 * time.Second)
			defer t.Stop()
			for range t.C {
				kbps, pct := pcw.metrics.roll()
				log.Printf("audio throughput: %0.1f kB/s (%d%% of expected)", kbps/1024.0, pct)
			}
		}()
	}
	var pktCount uint64
	var lastPktAt int64 // unix nano
	// Per-sender sequence tracking to detect resets after reconnect
	lastSeq := make(map[string]uint8)
	lastRx := make(map[string]time.Time)
	type rxFrame struct {
		haveSeq bool
		seq     byte
		pcm     []byte
		at      time.Time
	}
	framesCh := make(chan rxFrame, 128)
	clearJB := make(chan struct{}, 4)

	// Jitter buffer + drop policy + writer
	go func() {
		queue := make([]rxFrame, 0, 256)
		calcBacklogMs := func() int {
			// bytes -> samples -> ms at 16 kHz mono 16-bit
			var bytes int
			for i := range queue {
				bytes += len(queue[i].pcm)
			}
			if bytes == 0 {
				return 0
			}
			samples := bytes / 2
			ms := samples * 1000 / outSampleRate
			return ms
		}
		droppedFrames := 0
		droppedBytes := 0
		lastDropLog := time.Now()
		for {
			// Drain inbound without blocking to keep queue up to date
			drained := false
		drainLoop:
			for {
				select {
				case f := <-framesCh:
					queue = append(queue, f)
					drained = true
				case <-clearJB:
					queue = queue[:0]
					droppedFrames = 0
					droppedBytes = 0
					if !framesOnly {
						log.Printf("jitter-buffer: cleared due to reconnect/participant change")
					}
				default:
					break drainLoop
				}
			}
			if drained && enableDrop {
				// If backlog too large, drop oldest until at/below target
				for len(queue) > 0 && calcBacklogMs() > maxLatencyMs {
					droppedFrames++
					droppedBytes += len(queue[0].pcm)
					queue = queue[1:]
				}
				if droppedFrames > 0 && time.Since(lastDropLog) > 500*time.Millisecond {
					if !framesOnly {
						log.Printf("drop-old: frames=%d bytes=%d newBacklog=%dms target=%dms", droppedFrames, droppedBytes, calcBacklogMs(), targetLatencyMs)
					}
					droppedFrames = 0
					droppedBytes = 0
					lastDropLog = time.Now()
				}
			}
			if len(queue) == 0 {
				time.Sleep(2 * time.Millisecond)
				continue
			}
			// Pop head and write to audio device (blocking at device rate)
			f := queue[0]
			queue = queue[1:]
			if len(f.pcm) > 0 {
				_ = writeAll(pw, f.pcm)
				pcw.metrics.addBytes(len(f.pcm))
			}
		}
	}()
	trackCb := lksdk.ParticipantCallback{
		// Data-only mode: ignore all audio/video tracks entirely
		OnTrackSubscribed: func(track *webrtc.TrackRemote, publication *lksdk.RemoteTrackPublication, rp *lksdk.RemoteParticipant) {
			if track.Kind() == webrtc.RTPCodecTypeAudio && !framesOnly {
				log.Printf("ignoring audio track from %s (data-only)", rp.Identity())
			}
			return
		},
		OnTrackUnsubscribed: func(track *webrtc.TrackRemote, publication *lksdk.RemoteTrackPublication, rp *lksdk.RemoteParticipant) {
			if !framesOnly {
				log.Printf("track unsubscribed: %s from %s", publication.Name(), rp.Identity())
			}
		},
		OnTrackMuted: func(pub lksdk.TrackPublication, p lksdk.Participant) {
			if !framesOnly {
				log.Printf("track muted: %s by %s", pub.Name(), p.Identity())
			}
		},
		OnTrackUnmuted: func(pub lksdk.TrackPublication, p lksdk.Participant) {
			if !framesOnly {
				log.Printf("track unmuted: %s by %s", pub.Name(), p.Identity())
			}
		},
		// Data receive path: payload format Uint8Array[seq, ...pcm16@16k].
		OnDataPacket: func(pkt lksdk.DataPacket, params lksdk.DataReceiveParams) {
			if target != "" && params.SenderIdentity != target {
				return
			}
			udp, ok := pkt.(*lksdk.UserDataPacket)
			if !ok {
				return
			}
			data := udp.Payload
			pktCount++
			if len(data) == 0 {
				return
			}
			// If length is odd, treat first byte as sequence number, rest as PCM16.
			// If even, accept headerless PCM16 for backward-compat.
			var (
				haveSeq bool
				seq     byte
				pcm     []byte
			)
			if len(data)%2 == 1 {
				haveSeq = true
				seq = data[0]
				pcm = data[1:]
			} else {
				pcm = data
			}
			// log per-packet details with seq (if present) and first few bytes of pcm
			preview := 8
			if len(pcm) < preview {
				preview = len(pcm)
			}
			if haveSeq {
				log.Printf("data frame #%d from=%s seq=%d bytes=%d pcmEven=%t pcmPreview=% x", pktCount, params.SenderIdentity, seq, len(pcm), len(pcm)%2 == 0, pcm[:preview])
				// Detect apparent stream reset: sequence wrapped backwards significantly or large RX gap
				if prev, ok := lastSeq[params.SenderIdentity]; ok {
					// backward jump more than 8 indicates reset (avoid wrap from 255->0 by allowing small negatives)
					if prev > seq && int(prev-seq) > 8 {
						if !framesOnly {
							log.Printf("seq reset detected for %s (prev=%d -> seq=%d); clearing jitter buffer", params.SenderIdentity, prev, seq)
						}
						select {
						case clearJB <- struct{}{}:
						default:
						}
					}
				}
				lastSeq[params.SenderIdentity] = seq
				if tprev, ok := lastRx[params.SenderIdentity]; ok {
					if time.Since(tprev) > 800*time.Millisecond {
						if !framesOnly {
							log.Printf("rx stall detected for %s (>800ms); clearing jitter buffer", params.SenderIdentity)
						}
						select {
						case clearJB <- struct{}{}:
						default:
						}
					}
				}
				lastRx[params.SenderIdentity] = time.Now()
			} else {
				log.Printf("data frame #%d from=%s bytes=%d pcmEven=%t pcmPreview=% x", pktCount, params.SenderIdentity, len(pcm), len(pcm)%2 == 0, pcm[:preview])
			}
			// update last-packet timestamp
			atomic.StoreInt64(&lastPktAt, time.Now().UnixNano())
			if len(pcm) < 2 {
				return
			}
			// enforce even length for PCM16
			if len(pcm)%2 != 0 {
				pcm = pcm[:len(pcm)-1]
			}
			// Enqueue into jitter buffer for paced write/drop policy
			select {
			case framesCh <- rxFrame{haveSeq: haveSeq, seq: seq, pcm: pcm, at: time.Now()}:
			default:
				// If channel is full, drop oldest by consuming one and pushing new
				// to prefer fresher audio
				<-framesCh
				framesCh <- rxFrame{haveSeq: haveSeq, seq: seq, pcm: pcm, at: time.Now()}
			}
		},
	}

	// Connect to room with pacer for smoother playout
	pf := lkpacer.NewPacerFactory(
		lkpacer.LeakyBucketPacer,
		lkpacer.WithBitrate(512_000),
		lkpacer.WithMaxLatency(100*time.Millisecond),
	)
	roomCallback := &lksdk.RoomCallback{
		ParticipantCallback: trackCb,
		OnDisconnected: func() {
			if !framesOnly {
				log.Printf("room disconnected")
			}
		},
		OnReconnecting: func() {
			if !framesOnly {
				log.Printf("room reconnecting...")
			}
		},
		OnReconnected: func() {
			if !framesOnly {
				log.Printf("room reconnected; clearing jitter buffer")
			}
			select {
			case clearJB <- struct{}{}:
			default:
			}
		},
		OnParticipantConnected: func(rp *lksdk.RemoteParticipant) {
			if !framesOnly {
				log.Printf("participant connected: %s (clear JB)", rp.Identity())
			}
			select {
			case clearJB <- struct{}{}:
			default:
			}
		},
		OnParticipantDisconnected: func(rp *lksdk.RemoteParticipant) {
			if !framesOnly {
				log.Printf("participant disconnected: %s (clear JB)", rp.Identity())
			}
			select {
			case clearJB <- struct{}{}:
			default:
			}
		},
		OnActiveSpeakersChanged: func(parts []lksdk.Participant) {
			if !framesOnly {
				ids := make([]string, 0, len(parts))
				for _, p := range parts {
					ids = append(ids, string(p.Identity()))
				}
				log.Printf("active speakers: %v", ids)
			}
		},
	}
	// AutoSubscribe=false to avoid subscribing to media tracks; we only use data packets
	lkRoom, err := lksdk.ConnectToRoomWithToken(url, token, roomCallback, lksdk.WithPacer(pf), lksdk.WithAutoSubscribe(false))
	if err != nil {
		log.Fatalf("connect: %v", err)
	}
	defer lkRoom.Disconnect()

	log.Printf("connected: local=%s remotes=%d", lkRoom.LocalParticipant.Identity(), len(lkRoom.GetRemoteParticipants()))

	// Stall watchdog: reconnect on no data activity
	if autoRejoin {
		go func() {
			ticker := time.NewTicker(500 * time.Millisecond)
			defer ticker.Stop()
			for range ticker.C {
				last := time.Unix(0, atomic.LoadInt64(&lastPktAt))
				if last.IsZero() {
					continue
				}
				if time.Since(last) > time.Duration(stallMs)*time.Millisecond {
					log.Printf("watchdog: no data for %dms; reconnecting", stallMs)
					// Clear jitter buffer and reconnect room
					select {
					case clearJB <- struct{}{}:
					default:
					}
					lkRoom.Disconnect()
					time.Sleep(500 * time.Millisecond)
					for attempt := 1; attempt <= 5; attempt++ {
						r, err := lksdk.ConnectToRoomWithToken(url, token, roomCallback, lksdk.WithPacer(pf), lksdk.WithAutoSubscribe(false))
						if err == nil {
							lkRoom = r
							atomic.StoreInt64(&lastPktAt, time.Now().UnixNano())
							log.Printf("watchdog: rejoined on attempt %d", attempt)
							break
						}
						if attempt == 5 {
							log.Printf("watchdog: reconnect failed after %d attempts: %v", attempt, err)
						} else {
							time.Sleep(time.Duration(attempt) * 500 * time.Millisecond)
						}
					}
				}
			}
		}()
	}

	// Enumerate existing participants and publications
	for _, rp := range lkRoom.GetRemoteParticipants() {
		id := string(rp.Identity())
		log.Printf("existing participant: %s (publications enumeration not available in this SDK version)", id)
	}

	// Handle shutdown
	sig := make(chan os.Signal, 1)
	signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
	<-sig
	for _, c := range cancelFns {
		c()
	}
	log.Printf("shutting down")
	_ = pw.Close()
}

// pcmWriter writes PCM16 bytes into an oto.Player and logs level.
type pcmWriter struct {
	w          io.Writer
	frames     int
	levelEvery int
	totalBytes int64
	lastWrite  time.Time
	mu         sync.Mutex
	metrics    audioMetrics
}

func (p *pcmWriter) Write(b []byte) (int, error) {
	n, err := p.w.Write(b)
	if err == nil {
		p.mu.Lock()
		p.frames++
		f := p.frames
		p.totalBytes += int64(n)
		p.lastWrite = time.Now()
		p.mu.Unlock()
		p.metrics.addBytes(n)
		if p.levelEvery > 0 && f%p.levelEvery == 0 {
			// compute simple RMS on last chunk
			var sum int64
			for i := 0; i+1 < len(b); i += 2 {
				v := int16(binary.LittleEndian.Uint16(b[i : i+2]))
				if v < 0 {
					v = -v
				}
				sum += int64(v)
			}
			avg := float64(sum) / float64(len(b)/2)
			log.Printf("level avgAbs=%.0f", avg)
		}
	}
	return n, err
}

// pcm16ToOtoWriter adapts media.PCM16Writer -> oto.Player bytes.
// Data-only mode: no PCMRemoteTrack writer is used; all audio comes via data packets

func writeAll(w interface{ Write([]byte) (int, error) }, b []byte) error {
	for len(b) > 0 {
		n, err := w.Write(b)
		if err != nil {
			if strings.Contains(err.Error(), "interrupted") {
				continue
			}
			return err
		}
		b = b[n:]
	}
	return nil
}

// writeSineTo writes PCM16 sine into a writer (same pipe as main player).
func writeSineTo(w io.Writer, freq float64, durSec float64, sampleRate int, channels int, volume float64) error {
	if durSec <= 0 {
		return nil
	}
	n := int(float64(sampleRate) * durSec)
	amp := int16(32767 * volume)
	// mono tone buffer
	buf := make([]int16, n)
	for i := 0; i < n; i++ {
		t := 2 * math.Pi * freq * float64(i) / float64(sampleRate)
		buf[i] = int16(float64(amp) * math.Sin(t))
	}
	// expand to channels
	var interleaved []int16
	if channels == 1 {
		interleaved = buf
	} else {
		interleaved = make([]int16, n*channels)
		for i := 0; i < n; i++ {
			for c := 0; c < channels; c++ {
				interleaved[i*channels+c] = buf[i]
			}
		}
	}
	// write in chunks to avoid blocking
	bytes := int16ToBytes(interleaved)
	const chunk = 4096
	pos := 0
	for pos < len(bytes) {
		end := pos + chunk
		if end > len(bytes) {
			end = len(bytes)
		}
		if _, err := w.Write(bytes[pos:end]); err != nil {
			return err
		}
		pos = end
	}
	return nil
}

// playSine generates a simple PCM16 sine wave and plays it via oto.
func playSine(ctx *oto.Context, freq float64, durSec float64, sampleRate int, channels int, volume float64) error {
	if durSec <= 0 {
		return nil
	}
	n := int(float64(sampleRate) * durSec)
	amp := int16(32767 * volume)
	// mono buffer
	buf := make([]int16, n)
	for i := 0; i < n; i++ {
		t := 2 * math.Pi * freq * float64(i) / float64(sampleRate)
		buf[i] = int16(float64(amp) * math.Sin(t))
	}
	// expand to requested channels (duplicate mono)
	var interleaved []int16
	if channels == 1 {
		interleaved = buf
	} else {
		interleaved = make([]int16, n*channels)
		for i := 0; i < n; i++ {
			for c := 0; c < channels; c++ {
				interleaved[i*channels+c] = buf[i]
			}
		}
	}
	// stream to player
	pr, pw := io.Pipe()
	player := ctx.NewPlayer(pr)
	player.Play()
	go func() {
		_ = writeAll(pw, int16ToBytes(interleaved))
		_ = pw.Close()
	}()
	// small wait for the buffer to flush
	time.Sleep(time.Duration(durSec*1000)*time.Millisecond + 100*time.Millisecond)
	_ = player.Close()
	return nil
}

// audioMetrics tracks per-second bytes vs expected rate.
type audioMetrics struct {
	mu             sync.Mutex
	expectedBps    int
	windowStart    time.Time
	windowBytes    int
	lastSampleTime time.Time
}

func (m *audioMetrics) addBytes(n int) {
	m.mu.Lock()
	defer m.mu.Unlock()
	if m.windowStart.IsZero() {
		m.windowStart = time.Now()
	}
	m.windowBytes += n
}

// roll prints last 1s window and resets.
func (m *audioMetrics) roll() (bytesPerSec float64, pct int) {
	m.mu.Lock()
	defer m.mu.Unlock()
	if m.windowStart.IsZero() {
		m.windowStart = time.Now()
		return 0, 0
	}
	now := time.Now()
	dur := now.Sub(m.windowStart)
	bytes := m.windowBytes
	// reset window
	m.windowStart = now
	m.windowBytes = 0
	if dur <= 0 {
		return 0, 0
	}
	bps := float64(bytes) / dur.Seconds()
	pct = 0
	if m.expectedBps > 0 {
		pct = int((bps / float64(m.expectedBps)) * 100.0)
		if pct > 999 {
			pct = 999
		}
		if pct < 0 {
			pct = 0
		}
	}
	return bps, pct
}

// snapshot shows current window rate without resetting.
func (m *audioMetrics) snapshot() (bytesPerSec float64, pct int) {
	m.mu.Lock()
	defer m.mu.Unlock()
	if m.windowStart.IsZero() {
		return 0, 0
	}
	dur := time.Since(m.windowStart)
	if dur <= 0 {
		return 0, 0
	}
	bps := float64(m.windowBytes) / dur.Seconds()
	pct = 0
	if m.expectedBps > 0 {
		pct = int((bps / float64(m.expectedBps)) * 100.0)
		if pct > 999 {
			pct = 999
		}
		if pct < 0 {
			pct = 0
		}
	}
	return bps, pct
}

// decodeJWTIdentity best-effort decodes a JWT and returns sub/name without verifying signature.
func decodeJWTIdentity(tok string) (sub, name string) {
	parts := strings.Split(tok, ".")
	if len(parts) < 2 {
		return "", ""
	}
	payload, err := base64.RawURLEncoding.DecodeString(parts[1])
	if err != nil {
		return "", ""
	}
	var m map[string]any
	if err := json.Unmarshal(payload, &m); err != nil {
		return "", ""
	}
	if v, ok := m["sub"].(string); ok {
		sub = v
	}
	if v, ok := m["name"].(string); ok {
		name = v
	}
	return sub, name
}

func toHTTPBase(wsURL string) string {
	if strings.HasPrefix(wsURL, "wss://") {
		return "https://" + strings.TrimPrefix(wsURL, "wss://")
	}
	if strings.HasPrefix(wsURL, "ws://") {
		return "http://" + strings.TrimPrefix(wsURL, "ws://")
	}
	// assume already http(s)
	return wsURL
}

func hasAudio(p *livekitpb.ParticipantInfo) bool {
	for _, ti := range p.Tracks {
		if ti.Type == livekitpb.TrackType_AUDIO && ti.Muted == false {
			return true
		}
	}
	return false
}

func hasVideo(p *livekitpb.ParticipantInfo) bool {
	for _, ti := range p.Tracks {
		if ti.Type == livekitpb.TrackType_VIDEO && ti.Muted == false {
			return true
		}
	}
	return false
}
